<!DOCTYPE html>



  


<html class="theme-next pisces use-motion" lang="zh-Hans">
<head>
  <meta charset="UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"/>
<meta name="theme-color" content="#222">
<meta name="referrer" content="same-origin">
<meta name="referrer" content="no-referrer" />








<meta http-equiv="Cache-Control" content="no-transform" />
<meta http-equiv="Cache-Control" content="no-siteapp" />
















  
  
  <link href="/lib/fancybox/source/jquery.fancybox.css?v=2.1.5" rel="stylesheet" type="text/css" />







<link href="/lib/font-awesome/css/font-awesome.min.css?v=4.6.2" rel="stylesheet" type="text/css" />

<link href="/css/main.css?v=5.1.4" rel="stylesheet" type="text/css" />


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png?v=5.1.4">


  <link rel="mask-icon" href="/images/logo.svg?v=5.1.4" color="#222">





  <meta name="keywords" content="RocketMQ之旅," />










<meta name="description" content="消息接收&amp;emsp;&amp;emsp;在BrokerController的registerProcessor()中，注册了很多网络处理器：12345678910111213141516171819public void registerProcessor() &amp;#123;    /**        * SendMessageProcessor        */    SendMessageProce">
<meta name="keywords" content="RocketMQ之旅">
<meta property="og:type" content="article">
<meta property="og:title" content="RocketMQ之旅（七）broker处理消息">
<meta property="og:url" content="http://yoursite.com/2019/02/01/RocketMQ之旅（七）broker处理消息/index.html">
<meta property="og:site_name" content="矩阵编程">
<meta property="og:description" content="消息接收&amp;emsp;&amp;emsp;在BrokerController的registerProcessor()中，注册了很多网络处理器：12345678910111213141516171819public void registerProcessor() &amp;#123;    /**        * SendMessageProcessor        */    SendMessageProce">
<meta property="og:locale" content="zh-Hans">
<meta property="og:updated_time" content="2019-10-24T16:22:02.000Z">
<meta name="twitter:card" content="summary">
<meta name="twitter:title" content="RocketMQ之旅（七）broker处理消息">
<meta name="twitter:description" content="消息接收&amp;emsp;&amp;emsp;在BrokerController的registerProcessor()中，注册了很多网络处理器：12345678910111213141516171819public void registerProcessor() &amp;#123;    /**        * SendMessageProcessor        */    SendMessageProce">



<script type="text/javascript" id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '/',
    scheme: 'Pisces',
    version: '5.1.4',
    sidebar: {"position":"left","display":"post","offset":12,"b2t":false,"scrollpercent":false,"onmobile":false},
    fancybox: true,
    tabs: true,
    motion: {"enable":true,"async":false,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    duoshuo: {
      userId: '0',
      author: '博主'
    },
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>



  <link rel="canonical" href="http://yoursite.com/2019/02/01/RocketMQ之旅（七）broker处理消息/"/>





  <title>RocketMQ之旅（七）broker处理消息 | 矩阵编程</title>
  





  <script type="text/javascript">
    var _hmt = _hmt || [];
    (function() {
      var hm = document.createElement("script");
      hm.src = "https://hm.baidu.com/hm.js?7d29e6fba8e07d1a8af2414a20e5f2a6";
      var s = document.getElementsByTagName("script")[0];
      s.parentNode.insertBefore(hm, s);
    })();
  </script>




</head>

<body itemscope itemtype="http://schema.org/WebPage" lang="zh-Hans">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta ">
    

    <div class="custom-logo-site-title">
      <a href="/"  class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">矩阵编程</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
      
        <p class="site-subtitle"></p>
      
  </div>

  <div class="site-nav-toggle">
    <button>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>

<nav class="site-nav">
  

  
    <ul id="menu" class="menu">
      
        
        <li class="menu-item menu-item-主页">
          <a href="/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-主页"></i> <br />
            
            主页
          </a>
        </li>
      
        
        <li class="menu-item menu-item-categories">
          <a href="/categories/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-分类"></i> <br />
            
            分类
          </a>
        </li>
      
        
        <li class="menu-item menu-item-所有文章">
          <a href="/archives/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-文章"></i> <br />
            
            所有文章
          </a>
        </li>
      

      
    </ul>
  

  
</nav>



 </div>
    </header>

    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  <article class="post post-type-normal" itemscope itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="http://yoursite.com/2019/02/01/RocketMQ之旅（七）broker处理消息/">

    <span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
      <meta itemprop="name" content="matrix">
      <meta itemprop="description" content="">
      <meta itemprop="image" content="/images/avatar.gif">
    </span>

    <span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="矩阵编程">
    </span>

    
      <header class="post-header">

        
        
          <h1 class="post-title" itemprop="name headline">RocketMQ之旅（七）broker处理消息</h1>
        

        <div class="post-meta">
          <span class="post-time">
            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              
              <time title="创建于" itemprop="dateCreated datePublished" datetime="2019-02-01T21:00:12+08:00">
                2019-02-01
              </time>
            

            

            
          </span>

          
            <span class="post-category" >
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope itemtype="http://schema.org/Thing">
                  <a href="/categories/RocketMQ之旅/" itemprop="url" rel="index">
                    <span itemprop="name">RocketMQ之旅</span>
                  </a>
                </span>

                
                
              
            </span>
          

          
            
              <span class="post-comments-count">
                <span class="post-meta-divider">|</span>
                <span class="post-meta-item-icon">
                  <i class="fa fa-comment-o"></i>
                </span>
                <a href="/2019/02/01/RocketMQ之旅（七）broker处理消息/#comments" itemprop="discussionUrl">
                  <span class="post-comments-count disqus-comment-count"
                        data-disqus-identifier="2019/02/01/RocketMQ之旅（七）broker处理消息/" itemprop="commentCount"></span>
                </a>
              </span>
            
          

          
          

          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <h4 id="消息接收"><a href="#消息接收" class="headerlink" title="消息接收"></a>消息接收</h4><p>&emsp;&emsp;在<strong>BrokerController</strong>的<strong>registerProcessor()</strong>中，注册了很多网络处理器：<br><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">registerProcessor</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="comment">/**</span></span><br><span class="line"><span class="comment">        * SendMessageProcessor</span></span><br><span class="line"><span class="comment">        */</span></span><br><span class="line">    SendMessageProcessor sendProcessor = <span class="keyword">new</span> SendMessageProcessor(<span class="keyword">this</span>);</span><br><span class="line">    sendProcessor.registerSendMessageHook(sendMessageHookList);</span><br><span class="line">    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);</span><br><span class="line"></span><br><span class="line">    <span class="keyword">this</span>.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, <span class="keyword">this</span>.sendMessageExecutor);</span><br><span class="line">    <span class="keyword">this</span>.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, <span class="keyword">this</span>.sendMessageExecutor);</span><br><span class="line">    <span class="keyword">this</span>.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, <span class="keyword">this</span>.sendMessageExecutor);</span><br><span class="line">    <span class="keyword">this</span>.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, <span class="keyword">this</span>.sendMessageExecutor);</span><br><span class="line">    <span class="keyword">this</span>.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, <span class="keyword">this</span>.sendMessageExecutor);</span><br><span class="line">    <span class="keyword">this</span>.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, <span class="keyword">this</span>.sendMessageExecutor);</span><br><span class="line">    <span class="keyword">this</span>.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, <span class="keyword">this</span>.sendMessageExecutor);</span><br><span class="line">    <span class="keyword">this</span>.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, <span class="keyword">this</span>.sendMessageExecutor);</span><br><span class="line">    <span class="comment">// ...省略部分的注册逻辑...</span></span><br><span class="line">    <span class="comment">//...</span></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure></p>
<p>其中<strong>SendMessageProcessor</strong>主要负责接收Producer发过来的消息，继承于NettyRequestProcessor，每个继承于NettyRequestProcessor的子类都需要实现<strong>processRequest()</strong>：<br><a id="more"></a><br><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br></pre></td><td class="code"><pre><span class="line"><span class="meta">@Override</span></span><br><span class="line"><span class="function"><span class="keyword">public</span> RemotingCommand <span class="title">processRequest</span><span class="params">(ChannelHandlerContext ctx,</span></span></span><br><span class="line"><span class="function"><span class="params">                                        RemotingCommand request)</span> <span class="keyword">throws</span> RemotingCommandException </span>&#123;</span><br><span class="line">    SendMessageContext mqtraceContext;</span><br><span class="line">    <span class="keyword">switch</span> (request.getCode()) &#123;</span><br><span class="line">        <span class="keyword">case</span> RequestCode.CONSUMER_SEND_MSG_BACK:</span><br><span class="line">            <span class="keyword">return</span> <span class="keyword">this</span>.consumerSendMsgBack(ctx, request);</span><br><span class="line">        <span class="keyword">default</span>:</span><br><span class="line">            SendMessageRequestHeader requestHeader = parseRequestHeader(request);</span><br><span class="line">            <span class="keyword">if</span> (requestHeader == <span class="keyword">null</span>) &#123;</span><br><span class="line">                <span class="keyword">return</span> <span class="keyword">null</span>;</span><br><span class="line">            &#125;</span><br><span class="line"></span><br><span class="line">            mqtraceContext = buildMsgContext(ctx, requestHeader);</span><br><span class="line">            <span class="keyword">this</span>.executeSendMessageHookBefore(ctx, request, mqtraceContext);</span><br><span class="line"></span><br><span class="line">            RemotingCommand response;</span><br><span class="line">            <span class="keyword">if</span> (requestHeader.isBatch()) &#123;</span><br><span class="line">                response = <span class="keyword">this</span>.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);</span><br><span class="line">            &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">                response = <span class="keyword">this</span>.sendMessage(ctx, request, mqtraceContext, requestHeader);</span><br><span class="line">            &#125;</span><br><span class="line"></span><br><span class="line">            <span class="keyword">this</span>.executeSendMessageHookAfter(response, mqtraceContext);</span><br><span class="line">            <span class="keyword">return</span> response;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure></p>
<ol>
<li><strong>consumerSendMsgBack()</strong> 用于处理Consumer回传的消息，这一点在<strong>RocketMQ之旅（四）client之Consumer</strong>中也提到过。</li>
<li><strong>sendMessage()</strong>和<strong>sendBatchMessage()</strong>就是核心的消息接收方法，前者处理单个消息，后者处理批量消息。</li>
</ol>
<p>先来看下<strong>consumerSendMsgBack()</strong>对于重新发回来的消息做了什么处理：<br><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br><span class="line">72</span><br><span class="line">73</span><br><span class="line">74</span><br><span class="line">75</span><br><span class="line">76</span><br><span class="line">77</span><br><span class="line">78</span><br><span class="line">79</span><br><span class="line">80</span><br><span class="line">81</span><br><span class="line">82</span><br><span class="line">83</span><br><span class="line">84</span><br><span class="line">85</span><br><span class="line">86</span><br><span class="line">87</span><br><span class="line">88</span><br><span class="line">89</span><br><span class="line">90</span><br><span class="line">91</span><br><span class="line">92</span><br><span class="line">93</span><br><span class="line">94</span><br><span class="line">95</span><br><span class="line">96</span><br><span class="line">97</span><br><span class="line">98</span><br><span class="line">99</span><br><span class="line">100</span><br><span class="line">101</span><br><span class="line">102</span><br><span class="line">103</span><br><span class="line">104</span><br><span class="line">105</span><br><span class="line">106</span><br><span class="line">107</span><br><span class="line">108</span><br><span class="line">109</span><br><span class="line">110</span><br><span class="line">111</span><br><span class="line">112</span><br><span class="line">113</span><br><span class="line">114</span><br><span class="line">115</span><br><span class="line">116</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> RemotingCommand <span class="title">consumerSendMsgBack</span><span class="params">(<span class="keyword">final</span> ChannelHandlerContext ctx, <span class="keyword">final</span> RemotingCommand request)</span></span></span><br><span class="line"><span class="function">    <span class="keyword">throws</span> RemotingCommandException </span>&#123;</span><br><span class="line">    <span class="comment">// 代码篇幅的问题省略了RemotingCommand组装的代码...</span></span><br><span class="line">    String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());</span><br><span class="line">    <span class="keyword">int</span> queueIdInt = Math.abs(<span class="keyword">this</span>.random.nextInt() % <span class="number">99999999</span>) % subscriptionGroupConfig.getRetryQueueNums();</span><br><span class="line"></span><br><span class="line">    <span class="keyword">int</span> topicSysFlag = <span class="number">0</span>;</span><br><span class="line">    <span class="keyword">if</span> (requestHeader.isUnitMode()) &#123;</span><br><span class="line">        topicSysFlag = TopicSysFlag.buildSysFlag(<span class="keyword">false</span>, <span class="keyword">true</span>);</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    TopicConfig topicConfig = <span class="keyword">this</span>.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(</span><br><span class="line">        newTopic,</span><br><span class="line">        subscriptionGroupConfig.getRetryQueueNums(),</span><br><span class="line">        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);</span><br><span class="line">    <span class="keyword">if</span> (<span class="keyword">null</span> == topicConfig) &#123;</span><br><span class="line">        response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line">        response.setRemark(<span class="string">"topic["</span> + newTopic + <span class="string">"] not exist"</span>);</span><br><span class="line">        <span class="keyword">return</span> response;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">if</span> (!PermName.isWriteable(topicConfig.getPerm())) &#123;</span><br><span class="line">        response.setCode(ResponseCode.NO_PERMISSION);</span><br><span class="line">        response.setRemark(String.format(<span class="string">"the topic[%s] sending message is forbidden"</span>, newTopic));</span><br><span class="line">        <span class="keyword">return</span> response;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    MessageExt msgExt = <span class="keyword">this</span>.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());</span><br><span class="line">    <span class="keyword">if</span> (<span class="keyword">null</span> == msgExt) &#123;</span><br><span class="line">        response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line">        response.setRemark(<span class="string">"look message by offset failed, "</span> + requestHeader.getOffset());</span><br><span class="line">        <span class="keyword">return</span> response;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">final</span> String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);</span><br><span class="line">    <span class="keyword">if</span> (<span class="keyword">null</span> == retryTopic) &#123;</span><br><span class="line">        MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());</span><br><span class="line">    &#125;</span><br><span class="line">    msgExt.setWaitStoreMsgOK(<span class="keyword">false</span>);</span><br><span class="line"></span><br><span class="line">    <span class="keyword">int</span> delayLevel = requestHeader.getDelayLevel();</span><br><span class="line"></span><br><span class="line">    <span class="keyword">int</span> maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();</span><br><span class="line">    <span class="keyword">if</span> (request.getVersion() &gt;= MQVersion.Version.V3_4_9.ordinal()) &#123;</span><br><span class="line">        maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">if</span> (msgExt.getReconsumeTimes() &gt;= maxReconsumeTimes</span><br><span class="line">        || delayLevel &lt; <span class="number">0</span>) &#123;</span><br><span class="line">        newTopic = MixAll.getDLQTopic(requestHeader.getGroup());</span><br><span class="line">        queueIdInt = Math.abs(<span class="keyword">this</span>.random.nextInt() % <span class="number">99999999</span>) % DLQ_NUMS_PER_GROUP;</span><br><span class="line"></span><br><span class="line">        topicConfig = <span class="keyword">this</span>.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,</span><br><span class="line">            DLQ_NUMS_PER_GROUP,</span><br><span class="line">            PermName.PERM_WRITE, <span class="number">0</span></span><br><span class="line">        );</span><br><span class="line">        <span class="keyword">if</span> (<span class="keyword">null</span> == topicConfig) &#123;</span><br><span class="line">            response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line">            response.setRemark(<span class="string">"topic["</span> + newTopic + <span class="string">"] not exist"</span>);</span><br><span class="line">            <span class="keyword">return</span> response;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">        <span class="keyword">if</span> (<span class="number">0</span> == delayLevel) &#123;</span><br><span class="line">            delayLevel = <span class="number">3</span> + msgExt.getReconsumeTimes();</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        msgExt.setDelayTimeLevel(delayLevel);</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    MessageExtBrokerInner msgInner = <span class="keyword">new</span> MessageExtBrokerInner();</span><br><span class="line">    msgInner.setTopic(newTopic);</span><br><span class="line">    msgInner.setBody(msgExt.getBody());</span><br><span class="line">    msgInner.setFlag(msgExt.getFlag());</span><br><span class="line">    MessageAccessor.setProperties(msgInner, msgExt.getProperties());</span><br><span class="line">    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));</span><br><span class="line">    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(<span class="keyword">null</span>, msgExt.getTags()));</span><br><span class="line"></span><br><span class="line">    msgInner.setQueueId(queueIdInt);</span><br><span class="line">    msgInner.setSysFlag(msgExt.getSysFlag());</span><br><span class="line">    msgInner.setBornTimestamp(msgExt.getBornTimestamp());</span><br><span class="line">    msgInner.setBornHost(msgExt.getBornHost());</span><br><span class="line">    msgInner.setStoreHost(<span class="keyword">this</span>.getStoreHost());</span><br><span class="line">    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + <span class="number">1</span>);</span><br><span class="line"></span><br><span class="line">    String originMsgId = MessageAccessor.getOriginMessageId(msgExt);</span><br><span class="line">    MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);</span><br><span class="line"></span><br><span class="line">    PutMessageResult putMessageResult = <span class="keyword">this</span>.brokerController.getMessageStore().putMessage(msgInner);</span><br><span class="line">    <span class="keyword">if</span> (putMessageResult != <span class="keyword">null</span>) &#123;</span><br><span class="line">        <span class="keyword">switch</span> (putMessageResult.getPutMessageStatus()) &#123;</span><br><span class="line">            <span class="keyword">case</span> PUT_OK:</span><br><span class="line">                String backTopic = msgExt.getTopic();</span><br><span class="line">                String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);</span><br><span class="line">                <span class="keyword">if</span> (correctTopic != <span class="keyword">null</span>) &#123;</span><br><span class="line">                    backTopic = correctTopic;</span><br><span class="line">                &#125;</span><br><span class="line"></span><br><span class="line">                <span class="keyword">this</span>.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);</span><br><span class="line"></span><br><span class="line">                response.setCode(ResponseCode.SUCCESS);</span><br><span class="line">                response.setRemark(<span class="keyword">null</span>);</span><br><span class="line"></span><br><span class="line">                <span class="keyword">return</span> response;</span><br><span class="line">            <span class="keyword">default</span>:</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line">        response.setRemark(putMessageResult.getPutMessageStatus().name());</span><br><span class="line">        <span class="keyword">return</span> response;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line">    response.setRemark(<span class="string">"putMessageResult is null"</span>);</span><br><span class="line">    <span class="keyword">return</span> response;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure></p>
<p>可以看到对于重试的消息，会以%RETRY%作为开头，以groupName作为结尾，重新命名他的topic。如果重试的次数达到默认16次以后，会将消息加入到DLQ死信队列中，该队列的topic以%DLQ%为开头，groupName为结尾。最后是组装MessageExtBrokerInner，使用messageStore持久化这条消息，这部分留待store模块再讨论。至于<strong>sendMessage()</strong>或者<strong>sendBatchMessage()</strong>，他们的基本逻辑也是类似的，所以这里不再继续深入。</p>
<h4 id="消息消费"><a href="#消息消费" class="headerlink" title="消息消费"></a>消息消费</h4><p>&emsp;&emsp;消息消费的核心处理器是<strong>PullMessageProcessor</strong>。<strong>PullMessageProcessor</strong>同样继承于NettyRequestProcessor，其主要的内部核心的处理方法<strong>processRequest()</strong>，这里由于<strong>processRequest()</strong>的代码实在太长了，就不贴所有的代码了。大致上整个<strong>processRequest()</strong>分为：</p>
<ol>
<li>构建响应结果，读取消息头，检测消息的合法性、消息订阅者的合法性、broker的读写权限、topic的配置信息以及消息过滤器的构建等等。</li>
<li>从MessageStore中获取消息。</li>
</ol>
<p>我们来关注下从MessageStore中获取消息以后的行为：<br><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br><span class="line">72</span><br><span class="line">73</span><br><span class="line">74</span><br><span class="line">75</span><br><span class="line">76</span><br><span class="line">77</span><br><span class="line">78</span><br><span class="line">79</span><br><span class="line">80</span><br><span class="line">81</span><br><span class="line">82</span><br><span class="line">83</span><br><span class="line">84</span><br><span class="line">85</span><br><span class="line">86</span><br><span class="line">87</span><br><span class="line">88</span><br><span class="line">89</span><br><span class="line">90</span><br><span class="line">91</span><br><span class="line">92</span><br><span class="line">93</span><br><span class="line">94</span><br><span class="line">95</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> RemotingCommand <span class="title">processRequest</span><span class="params">(<span class="keyword">final</span> Channel channel, RemotingCommand request, <span class="keyword">boolean</span> brokerAllowSuspend)</span></span>&#123;</span><br><span class="line"><span class="comment">// 省略上面大部分代码...</span></span><br><span class="line"><span class="comment">// 这里主要展示从MessageStore获取到消息的后续行为</span></span><br><span class="line">    <span class="keyword">final</span> GetMessageResult getMessageResult =</span><br><span class="line">        <span class="keyword">this</span>.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),</span><br><span class="line">            requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);</span><br><span class="line">    <span class="keyword">if</span> (getMessageResult != <span class="keyword">null</span>) &#123;</span><br><span class="line">        response.setRemark(getMessageResult.getStatus().name());</span><br><span class="line">        responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());</span><br><span class="line">        responseHeader.setMinOffset(getMessageResult.getMinOffset());</span><br><span class="line">        responseHeader.setMaxOffset(getMessageResult.getMaxOffset());</span><br><span class="line"></span><br><span class="line">        <span class="keyword">if</span> (getMessageResult.isSuggestPullingFromSlave()) &#123;</span><br><span class="line">            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        <span class="keyword">switch</span> (<span class="keyword">this</span>.brokerController.getMessageStoreConfig().getBrokerRole()) &#123;</span><br><span class="line">            <span class="keyword">case</span> ASYNC_MASTER:</span><br><span class="line">            <span class="keyword">case</span> SYNC_MASTER:</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> SLAVE:</span><br><span class="line">                <span class="keyword">if</span> (!<span class="keyword">this</span>.brokerController.getBrokerConfig().isSlaveReadEnable()) &#123;</span><br><span class="line">                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);</span><br><span class="line">                    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);</span><br><span class="line">                &#125;</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        <span class="keyword">if</span> (<span class="keyword">this</span>.brokerController.getBrokerConfig().isSlaveReadEnable()) &#123;</span><br><span class="line">            <span class="comment">// consume too slow ,redirect to another machine</span></span><br><span class="line">            <span class="keyword">if</span> (getMessageResult.isSuggestPullingFromSlave()) &#123;</span><br><span class="line">                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="comment">// consume ok</span></span><br><span class="line">            <span class="keyword">else</span> &#123;</span><br><span class="line">                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());</span><br><span class="line">            &#125;</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        <span class="keyword">switch</span> (getMessageResult.getStatus()) &#123;</span><br><span class="line">            <span class="keyword">case</span> FOUND:</span><br><span class="line">                response.setCode(ResponseCode.SUCCESS);</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> MESSAGE_WAS_REMOVING:</span><br><span class="line">                response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> NO_MATCHED_LOGIC_QUEUE:</span><br><span class="line">            <span class="keyword">case</span> NO_MESSAGE_IN_QUEUE:</span><br><span class="line">                <span class="keyword">if</span> (<span class="number">0</span> != requestHeader.getQueueOffset()) &#123;</span><br><span class="line">                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);</span><br><span class="line"></span><br><span class="line">                    <span class="comment">// <span class="doctag">XXX:</span> warn and notify me</span></span><br><span class="line">                    log.info(<span class="string">"the broker store no queue data, fix the request offset &#123;&#125; to &#123;&#125;, Topic: &#123;&#125; QueueId: &#123;&#125; Consumer Group: &#123;&#125;"</span>,</span><br><span class="line">                        requestHeader.getQueueOffset(),</span><br><span class="line">                        getMessageResult.getNextBeginOffset(),</span><br><span class="line">                        requestHeader.getTopic(),</span><br><span class="line">                        requestHeader.getQueueId(),</span><br><span class="line">                        requestHeader.getConsumerGroup()</span><br><span class="line">                    );</span><br><span class="line">                &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">                    response.setCode(ResponseCode.PULL_NOT_FOUND);</span><br><span class="line">                &#125;</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> NO_MATCHED_MESSAGE:</span><br><span class="line">                response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> OFFSET_FOUND_NULL:</span><br><span class="line">                response.setCode(ResponseCode.PULL_NOT_FOUND);</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> OFFSET_OVERFLOW_BADLY:</span><br><span class="line">                response.setCode(ResponseCode.PULL_OFFSET_MOVED);</span><br><span class="line">                <span class="comment">// <span class="doctag">XXX:</span> warn and notify me</span></span><br><span class="line">                log.info(<span class="string">"the request offset: &#123;&#125; over flow badly, broker max offset: &#123;&#125;, consumer: &#123;&#125;"</span>,</span><br><span class="line">                    requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> OFFSET_OVERFLOW_ONE:</span><br><span class="line">                response.setCode(ResponseCode.PULL_NOT_FOUND);</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> OFFSET_TOO_SMALL:</span><br><span class="line">                response.setCode(ResponseCode.PULL_OFFSET_MOVED);</span><br><span class="line">                log.info(<span class="string">"the request offset too small. group=&#123;&#125;, topic=&#123;&#125;, requestOffset=&#123;&#125;, brokerMinOffset=&#123;&#125;, clientIp=&#123;&#125;"</span>,</span><br><span class="line">                    requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),</span><br><span class="line">                    getMessageResult.getMinOffset(), channel.remoteAddress());</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">default</span>:</span><br><span class="line">                <span class="keyword">assert</span> <span class="keyword">false</span>;</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="comment">// 先省略下面部分代码...</span></span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure></p>
<p>上面部分只是截取了一个代码片段，根据MessageStore不同的返回结果构建不同的response响应参数：</p>
<ol>
<li>如果当前Broker为slave且不可读，会立即返回PULL_RETRY_IMMEDIATELY，并建议当前Consumer去master节点拉取信息。如果slave节点允许读消息，根据MessageStore返回的结果<strong>suggestPullingFromSlave</strong>，这个参数可以判断当前broker机器的可用内存是否大于剩余未拉取的消息大小，如果内存不足，broker会推荐你去别的机器上拉取消息。</li>
<li>根据MessageStore返回的拉取结果的状态，去设置response的code。比如状态为<strong>FOUND</strong>，即设置response为<strong>SUCCESS</strong>。</li>
</ol>
<p>当然<strong>processRequest()</strong>不止于此，根据上面response的不同code，broker会统计一些状态，再来看下后面部分的代码：<br><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br><span class="line">72</span><br><span class="line">73</span><br><span class="line">74</span><br><span class="line">75</span><br><span class="line">76</span><br><span class="line">77</span><br><span class="line">78</span><br><span class="line">79</span><br><span class="line">80</span><br><span class="line">81</span><br><span class="line">82</span><br><span class="line">83</span><br><span class="line">84</span><br><span class="line">85</span><br><span class="line">86</span><br><span class="line">87</span><br><span class="line">88</span><br><span class="line">89</span><br><span class="line">90</span><br><span class="line">91</span><br><span class="line">92</span><br><span class="line">93</span><br><span class="line">94</span><br><span class="line">95</span><br><span class="line">96</span><br><span class="line">97</span><br><span class="line">98</span><br><span class="line">99</span><br><span class="line">100</span><br><span class="line">101</span><br><span class="line">102</span><br><span class="line">103</span><br><span class="line">104</span><br><span class="line">105</span><br><span class="line">106</span><br><span class="line">107</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> RemotingCommand <span class="title">processRequest</span><span class="params">(<span class="keyword">final</span> Channel channel, RemotingCommand request, <span class="keyword">boolean</span> brokerAllowSuspend)</span></span>&#123;</span><br><span class="line">    <span class="comment">// 省略上面部分的代码...</span></span><br><span class="line">    <span class="keyword">if</span> (getMessageResult != <span class="keyword">null</span>) &#123;</span><br><span class="line">        <span class="comment">// 省略 设置response code的的代码逻辑...</span></span><br><span class="line">        <span class="keyword">switch</span> (response.getCode()) &#123;</span><br><span class="line">            <span class="keyword">case</span> ResponseCode.SUCCESS:</span><br><span class="line"></span><br><span class="line">                <span class="keyword">this</span>.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),</span><br><span class="line">                    getMessageResult.getMessageCount());</span><br><span class="line"></span><br><span class="line">                <span class="keyword">this</span>.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),</span><br><span class="line">                    getMessageResult.getBufferTotalSize());</span><br><span class="line"></span><br><span class="line">                <span class="keyword">this</span>.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());</span><br><span class="line">                <span class="keyword">if</span> (<span class="keyword">this</span>.brokerController.getBrokerConfig().isTransferMsgByHeap()) &#123;</span><br><span class="line">                    <span class="keyword">final</span> <span class="keyword">long</span> beginTimeMills = <span class="keyword">this</span>.brokerController.getMessageStore().now();</span><br><span class="line">                    <span class="keyword">final</span> <span class="keyword">byte</span>[] r = <span class="keyword">this</span>.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());</span><br><span class="line">                    <span class="keyword">this</span>.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),</span><br><span class="line">                        requestHeader.getTopic(), requestHeader.getQueueId(),</span><br><span class="line">                        (<span class="keyword">int</span>) (<span class="keyword">this</span>.brokerController.getMessageStore().now() - beginTimeMills));</span><br><span class="line">                    response.setBody(r);</span><br><span class="line">                &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">                    <span class="keyword">try</span> &#123;</span><br><span class="line">                        FileRegion fileRegion =</span><br><span class="line">                            <span class="keyword">new</span> ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);</span><br><span class="line">                        channel.writeAndFlush(fileRegion).addListener(<span class="keyword">new</span> ChannelFutureListener() &#123;</span><br><span class="line">                            <span class="meta">@Override</span></span><br><span class="line">                            <span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">operationComplete</span><span class="params">(ChannelFuture future)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">                                getMessageResult.release();</span><br><span class="line">                                <span class="keyword">if</span> (!future.isSuccess()) &#123;</span><br><span class="line">                                    log.error(<span class="string">"transfer many message by pagecache failed, &#123;&#125;"</span>, channel.remoteAddress(), future.cause());</span><br><span class="line">                                &#125;</span><br><span class="line">                            &#125;</span><br><span class="line">                        &#125;);</span><br><span class="line">                    &#125; <span class="keyword">catch</span> (Throwable e) &#123;</span><br><span class="line">                        log.error(<span class="string">"transfer many message by pagecache exception"</span>, e);</span><br><span class="line">                        getMessageResult.release();</span><br><span class="line">                    &#125;</span><br><span class="line"></span><br><span class="line">                    response = <span class="keyword">null</span>;</span><br><span class="line">                &#125;</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> ResponseCode.PULL_NOT_FOUND:</span><br><span class="line"></span><br><span class="line">                <span class="keyword">if</span> (brokerAllowSuspend &amp;&amp; hasSuspendFlag) &#123;</span><br><span class="line">                    <span class="keyword">long</span> pollingTimeMills = suspendTimeoutMillisLong;</span><br><span class="line">                    <span class="keyword">if</span> (!<span class="keyword">this</span>.brokerController.getBrokerConfig().isLongPollingEnable()) &#123;</span><br><span class="line">                        pollingTimeMills = <span class="keyword">this</span>.brokerController.getBrokerConfig().getShortPollingTimeMills();</span><br><span class="line">                    &#125;</span><br><span class="line"></span><br><span class="line">                    String topic = requestHeader.getTopic();</span><br><span class="line">                    <span class="keyword">long</span> offset = requestHeader.getQueueOffset();</span><br><span class="line">                    <span class="keyword">int</span> queueId = requestHeader.getQueueId();</span><br><span class="line">                    PullRequest pullRequest = <span class="keyword">new</span> PullRequest(request, channel, pollingTimeMills,</span><br><span class="line">                        <span class="keyword">this</span>.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);</span><br><span class="line">                    <span class="keyword">this</span>.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);</span><br><span class="line">                    response = <span class="keyword">null</span>;</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                &#125;</span><br><span class="line"></span><br><span class="line">            <span class="keyword">case</span> ResponseCode.PULL_RETRY_IMMEDIATELY:</span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">case</span> ResponseCode.PULL_OFFSET_MOVED:</span><br><span class="line">                <span class="keyword">if</span> (<span class="keyword">this</span>.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE</span><br><span class="line">                    || <span class="keyword">this</span>.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) &#123;</span><br><span class="line">                    MessageQueue mq = <span class="keyword">new</span> MessageQueue();</span><br><span class="line">                    mq.setTopic(requestHeader.getTopic());</span><br><span class="line">                    mq.setQueueId(requestHeader.getQueueId());</span><br><span class="line">                    mq.setBrokerName(<span class="keyword">this</span>.brokerController.getBrokerConfig().getBrokerName());</span><br><span class="line"></span><br><span class="line">                    OffsetMovedEvent event = <span class="keyword">new</span> OffsetMovedEvent();</span><br><span class="line">                    event.setConsumerGroup(requestHeader.getConsumerGroup());</span><br><span class="line">                    event.setMessageQueue(mq);</span><br><span class="line">                    event.setOffsetRequest(requestHeader.getQueueOffset());</span><br><span class="line">                    event.setOffsetNew(getMessageResult.getNextBeginOffset());</span><br><span class="line">                    <span class="keyword">this</span>.generateOffsetMovedEvent(event);</span><br><span class="line">                    log.warn(</span><br><span class="line">                        <span class="string">"PULL_OFFSET_MOVED:correction offset. topic=&#123;&#125;, groupId=&#123;&#125;, requestOffset=&#123;&#125;, newOffset=&#123;&#125;, suggestBrokerId=&#123;&#125;"</span>,</span><br><span class="line">                        requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),</span><br><span class="line">                        responseHeader.getSuggestWhichBrokerId());</span><br><span class="line">                &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());</span><br><span class="line">                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);</span><br><span class="line">                    log.warn(<span class="string">"PULL_OFFSET_MOVED:none correction. topic=&#123;&#125;, groupId=&#123;&#125;, requestOffset=&#123;&#125;, suggestBrokerId=&#123;&#125;"</span>,</span><br><span class="line">                        requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),</span><br><span class="line">                        responseHeader.getSuggestWhichBrokerId());</span><br><span class="line">                &#125;</span><br><span class="line"></span><br><span class="line">                <span class="keyword">break</span>;</span><br><span class="line">            <span class="keyword">default</span>:</span><br><span class="line">                <span class="keyword">assert</span> <span class="keyword">false</span>;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">        response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line">        response.setRemark(<span class="string">"store getMessage return null"</span>);</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">boolean</span> storeOffsetEnable = brokerAllowSuspend;</span><br><span class="line">    storeOffsetEnable = storeOffsetEnable &amp;&amp; hasCommitOffsetFlag;</span><br><span class="line">    storeOffsetEnable = storeOffsetEnable</span><br><span class="line">        &amp;&amp; <span class="keyword">this</span>.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;</span><br><span class="line">    <span class="keyword">if</span> (storeOffsetEnable) &#123;</span><br><span class="line">        <span class="keyword">this</span>.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),</span><br><span class="line">            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> response;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure></p>
<ol>
<li>如果拉取成功，统计新增当前group获取的该topic的消息数、消息大小、以及broker获取的消息数等等。</li>
<li>根据BrokerConfig的<strong>transferMsgByHeap</strong>判断，是否启用零拷贝技术传输数据，默认不启用。如果启用，这里还会将整个响应封装成FileRegion，直接发送。FileRegion是netty中零拷贝传输文件的接口。</li>
<li>如果没有未消费的消息，且请求头中sysFlag对应FLAG_SUSPEND位为真，本次请求将会被挂起。如果BrokerConfig对应的<strong>longPollingEnable</strong>为真，即支持长轮询，本次请求被挂起的时间由请求自身附带的suspendTimeoutMillis参数决定，默认pull模式下为20s，push模式下为15s。如果不支持长轮询，pollingTimeMills为1s。</li>
</ol>

      
    </div>
    
    
    

    

    

    

    <footer class="post-footer">
      
        <div class="post-tags">
          
            <a href="/tags/RocketMQ之旅/" rel="tag"># RocketMQ之旅</a>
          
        </div>
      

      
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/2018/12/17/算法国（二）字符串匹配/" rel="next" title="算法国（二）字符串匹配">
                <i class="fa fa-chevron-left"></i> 算法国（二）字符串匹配
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/2019/02/02/谈谈高并发之分页缓存的设计/" rel="prev" title="谈谈高并发之分页缓存的设计">
                谈谈高并发之分页缓存的设计 <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>



    <div class="post-spread">
      
    </div>
  </div>


          </div>
          


          

  
    <div class="comments" id="comments">
      <div id="disqus_thread">
        <noscript>
          Please enable JavaScript to view the
          <a href="https://disqus.com/?ref_noscript">comments powered by Disqus.</a>
        </noscript>
      </div>
    </div>

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <section class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope itemtype="http://schema.org/Person">
            
              <p class="site-author-name" itemprop="name">matrix</p>
              <p class="site-description motion-element" itemprop="description"></p>
          </div>

          <nav class="site-state motion-element">

            
              <div class="site-state-item site-state-posts">
              
                <a href="/archives">
              
                  <span class="site-state-item-count">47</span>
                  <span class="site-state-item-name">日志</span>
                </a>
              </div>
            

            
              
              
              <div class="site-state-item site-state-categories">
                <a href="/categories/index.html">
                  <span class="site-state-item-count">18</span>
                  <span class="site-state-item-name">分类</span>
                </a>
              </div>
            

            
              
              
              <div class="site-state-item site-state-tags">
                
                  <span class="site-state-item-count">19</span>
                  <span class="site-state-item-name">标签</span>
                
              </div>
            

          </nav>

          

          

          
          

          
          

          

        </div>
      </section>

      
      <!--noindex-->
        <section class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
              
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-4"><a class="nav-link" href="#消息接收"><span class="nav-number">1.</span> <span class="nav-text">消息接收</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#消息消费"><span class="nav-number">2.</span> <span class="nav-text">消息消费</span></a></li></ol></div>
            

          </div>
        </section>
      <!--/noindex-->
      

      

    </div>
  </aside>


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">&copy; <span itemprop="copyrightYear">2021</span>
  <span class="with-love">
    <i class="fa fa-user"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">matrix</span>

  
</div>


  <div class="powered-by">由 <a class="theme-link" target="_blank" href="https://hexo.io">Hexo</a> 强力驱动</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 &mdash; <a class="theme-link" target="_blank" href="https://github.com/iissnan/hexo-theme-next">NexT.Pisces</a> v5.1.4</div>
<div class=BbeiAn-info">
  浙ICP备 -
  <a href="http://www.beian.miit.gov.cn">18040498号</a>
  </a>
</div>




        







        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
      </div>
    

    

  </div>

  

<script type="text/javascript">
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>









  












  
  
    <script type="text/javascript" src="/lib/jquery/index.js?v=2.1.3"></script>
  

  
  
    <script type="text/javascript" src="/lib/fastclick/lib/fastclick.min.js?v=1.0.6"></script>
  

  
  
    <script type="text/javascript" src="/lib/jquery_lazyload/jquery.lazyload.js?v=1.9.7"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/fancybox/source/jquery.fancybox.pack.js?v=2.1.5"></script>
  


  


  <script type="text/javascript" src="/js/src/utils.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/motion.js?v=5.1.4"></script>



  
  


  <script type="text/javascript" src="/js/src/affix.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/schemes/pisces.js?v=5.1.4"></script>



  
  <script type="text/javascript" src="/js/src/scrollspy.js?v=5.1.4"></script>
<script type="text/javascript" src="/js/src/post-details.js?v=5.1.4"></script>



  


  <script type="text/javascript" src="/js/src/bootstrap.js?v=5.1.4"></script>



  


  

    
      <script id="dsq-count-scr" src="https://luyun.disqus.com/count.js" async></script>
    

    
      <script type="text/javascript">
        var disqus_config = function () {
          this.page.url = 'http://yoursite.com/2019/02/01/RocketMQ之旅（七）broker处理消息/';
          this.page.identifier = '2019/02/01/RocketMQ之旅（七）broker处理消息/';
          this.page.title = 'RocketMQ之旅（七）broker处理消息';
        };
        var d = document, s = d.createElement('script');
        s.src = 'https://luyun.disqus.com/embed.js';
        s.setAttribute('data-timestamp', '' + +new Date());
        (d.head || d.body).appendChild(s);
      </script>
    

  




	





  














  





  

  

  

  
  

  

  

  

</body>
</html>
